KafkaRDD

KafkaRDD is a RDD of Kafka’s ConsumerRecords from topics in Apache Kafka. It has support for HasOffsetRanges.

Note
Kafka’s ConsumerRecord holds a topic name, a partition number, the offset of the record in the Kafka partition and the record itself (as a key-value pair).

It uses KafkaRDDPartition for partitions that know their preferred locations as the host of the topic (not port however!). It then nicely maps a RDD partition to a Kafka topic partition.

Note
KafkaRDD is a private[spark] class.

KafkaRDD overrides methods of RDD class to base them on offsetRanges, i.e. partitions.

You can create a KafkaRDD using KafkaUtils.createRDD or a dstream of KafkaRDD as DirectKafkaInputDStream using KafkaUtils.createDirectStream.

Tip

Enable INFO logging level for org.apache.spark.streaming.kafka010.KafkaRDD logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.streaming.kafka010.KafkaRDD=INFO

Refer to Logging.

Computing KafkaRDDPartition (in TaskContext) — compute Method

compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]]
Note
compute is a part of the RDD Contract.

compute assumes that it works with thePart as KafkaRDDPartition only. It asserts that the offsets are correct, i.e. fromOffset is at most untilOffset.

If the beginning and ending offsets are the same, you should see the following INFO message in the logs and compute returns an empty collection.

INFO KafkaRDD: Beginning offset [fromOffset] is the same as ending offset skipping [topic] [partition]

Otherwise, when the beginning and ending offsets are different, a KafkaRDDIterator is created (for the partition and the input TaskContext) and returned.

KafkaRDDPartition

KafkaRDDPartition is…​FIXME

results matching ""

    No results matching ""